diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/internal/database/models.go b/internal/database/models.go index 92304c9..da43b99 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -131,7 +131,7 @@ func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Ti case "10min": query = ` SELECT - to_char(bucket_start, 'YYYY-MM-DD HH24:MI:SS') AS date_time, + to_char(bucket_start + interval '10 minutes', 'YYYY-MM-DD HH24:MI:SS') AS date_time, ROUND(temp_c_x100/100.0, 2) AS temperature, ROUND(humidity_pct::numeric, 2) AS humidity, ROUND(pressure_hpa_x100/100.0, 2) AS pressure, @@ -143,7 +143,7 @@ func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Ti ROUND(rain_total_mm_x1000/1000.0, 3) AS rain_total FROM rs485_weather_10min WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3 - ORDER BY bucket_start` + ORDER BY bucket_start + interval '10 minutes'` case "30min": query = buildAggFrom10MinQuery("30 minutes") default: // 1hour @@ -205,8 +205,8 @@ func buildAggFrom10MinQuery(interval string) string { FROM base GROUP BY 1 ) - SELECT - to_char(grp, 'YYYY-MM-DD HH24:MI:SS') AS date_time, + SELECT + to_char(grp + '` + interval + `'::interval, 'YYYY-MM-DD HH24:MI:SS') AS date_time, ROUND((w_temp/NULLIF(n_sum,0))/100.0, 2) AS temperature, ROUND((w_hum/NULLIF(n_sum,0))::numeric, 2) AS humidity, ROUND((w_p/NULLIF(n_sum,0))/100.0, 2) AS pressure, @@ -219,7 +219,7 @@ func buildAggFrom10MinQuery(interval string) string { ROUND((w_uv/NULLIF(n_sum,0))::numeric, 2) AS uv, ROUND((rain_total_max/1000.0)::numeric, 3) AS rain_total FROM g - ORDER BY grp` + ORDER BY grp + '` + interval + `'::interval` } // buildWeatherDataQuery 构建天气数据查询SQL diff --git a/internal/tools/backfill.go b/internal/tools/backfill.go index bc1ae48..471911b 100644 --- a/internal/tools/backfill.go +++ b/internal/tools/backfill.go @@ -93,9 +93,15 @@ func RunBackfill10Min(ctx context.Context, opts BackfillOptions) error { prevTS = time.Time{} } - // 计算该样本所在桶(CST对齐) - bucketLocal := ts.In(loc).Truncate(time.Duration(opts.BucketMinutes) * time.Minute) - bucketStart := time.Date(bucketLocal.Year(), bucketLocal.Month(), bucketLocal.Day(), bucketLocal.Hour(), bucketLocal.Minute(), 0, 0, loc) + // 计算该样本所在桶(CST对齐) + // 改为左开右闭 (left-open, right-closed):恰好落在边界的样本归入“结束于该边界”的桶 + localTs := ts.In(loc) + floor := localTs.Truncate(time.Duration(opts.BucketMinutes) * time.Minute) + bucketStart := floor + if localTs.Equal(floor) { + bucketStart = floor.Add(-time.Duration(opts.BucketMinutes) * time.Minute) + } + bucketStart = time.Date(bucketStart.Year(), bucketStart.Month(), bucketStart.Day(), bucketStart.Hour(), bucketStart.Minute(), 0, 0, loc) if _, ok := buckets[stationID]; !ok { buckets[stationID] = make(map[time.Time]*agg) diff --git a/main.go b/main.go new file mode 100644 index 0000000..a4bf169 --- /dev/null +++ b/main.go @@ -0,0 +1,236 @@ +package main + +import ( + "bufio" + "encoding/hex" + "fmt" + "io" + "log" + "net" + "os" + "path/filepath" + "strings" + "sync" + "time" + "unicode/utf8" + + "weatherstation/config" + "weatherstation/model" +) + +type UTF8Writer struct { + w io.Writer +} + +func NewUTF8Writer(w io.Writer) *UTF8Writer { + return &UTF8Writer{w: w} +} + +func (w *UTF8Writer) Write(p []byte) (n int, err error) { + if utf8.Valid(p) { + return w.w.Write(p) + } + s := string(p) + s = strings.ToValidUTF8(s, "") + return w.w.Write([]byte(s)) +} + +var ( + logFile *os.File + logFileMutex sync.Mutex + currentLogDay int +) + +func getLogFileName() string { + currentTime := time.Now() + return filepath.Join("log", fmt.Sprintf("%s.log", currentTime.Format("2006-01-02"))) +} + +func openLogFile() (*os.File, error) { + logDir := "log" + if _, err := os.Stat(logDir); os.IsNotExist(err) { + os.MkdirAll(logDir, 0755) + } + + logFileName := getLogFileName() + return os.OpenFile(logFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) +} + +func setupLogger() { + var err error + logFile, err = openLogFile() + if err != nil { + log.Fatalf("无法创建日志文件: %v", err) + } + + currentLogDay = time.Now().Day() + + bufferedWriter := bufio.NewWriter(logFile) + utf8Writer := NewUTF8Writer(bufferedWriter) + + go func() { + for { + time.Sleep(1 * time.Second) + + logFileMutex.Lock() + bufferedWriter.Flush() + + now := time.Now() + if now.Day() != currentLogDay { + oldLogFile := logFile + logFile, err = openLogFile() + if err != nil { + log.Printf("无法创建新日志文件: %v", err) + } else { + oldLogFile.Close() + currentLogDay = now.Day() + bufferedWriter = bufio.NewWriter(logFile) + utf8Writer = NewUTF8Writer(bufferedWriter) + log.SetOutput(io.MultiWriter(os.Stdout, utf8Writer)) + log.Println("日志文件已轮转") + } + } + logFileMutex.Unlock() + } + }() + + multiWriter := io.MultiWriter(os.Stdout, utf8Writer) + log.SetOutput(multiWriter) + log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) +} + +func startUDP() { + cfg := config.GetConfig() + err := model.InitDB() + if err != nil { + log.Fatalf("初始化数据库失败: %v", err) + } + defer model.CloseDB() + addr := fmt.Sprintf(":%d", cfg.Server.UDPPort) + conn, err := net.ListenPacket("udp", addr) + if err != nil { + log.Fatalf("无法监听UDP端口 %d: %v", cfg.Server.UDPPort, err) + } + defer conn.Close() + log.Printf("UDP服务器已启动,监听端口 %d...", cfg.Server.UDPPort) + buffer := make([]byte, 2048) + for { + n, addr, err := conn.ReadFrom(buffer) + if err != nil { + log.Printf("读取数据错误: %v", err) + continue + } + rawData := buffer[:n] + log.Printf("从 %s 接收到 %d 字节数据", addr.String(), n) + + hexDump := hexDump(rawData) + log.Printf("原始码流(十六进制):\n%s", hexDump) + asciiDump := asciiDump(rawData) + log.Printf("ASCII码:\n%s", asciiDump) + + // 首先尝试解析为WH65LP数据 + if len(rawData) == 25 && rawData[0] == 0x24 { + wh65lpData, err := model.ParseWH65LPData(rawData) + if err != nil { + log.Printf("解析WH65LP数据失败: %v", err) + } else { + log.Println("成功解析WH65LP气象站数据:") + log.Println(wh65lpData) + + // 更新内存中的设备信息 + model.UpdateDeviceInMemory(wh65lpData.StationID, addr, model.DeviceTypeWH65LP) + // 注册设备到数据库 + err = model.RegisterDeviceInDB(wh65lpData.StationID, addr) + if err != nil { + log.Printf("注册设备失败: %v", err) + } + log.Printf("设备 %s 已注册,IP: %s", wh65lpData.StationID, addr.String()) + + // 保存数据 + err = model.SaveWH65LPData(wh65lpData, rawData) + if err != nil { + log.Printf("保存数据到数据库失败: %v", err) + } else { + log.Printf("数据已成功保存到数据库") + } + continue + } + } + + // 如果不是WH65LP数据,尝试解析为ECOWITT数据 + data := string(rawData) + weatherData, err := model.ParseWeatherData(data) + if err != nil { + log.Printf("解析ECOWITT数据失败: %v", err) + continue + } + + log.Println("成功解析ECOWITT气象站数据:") + log.Println(weatherData) + + if weatherData.StationID != "" { + // 更新内存中的设备信息 + model.UpdateDeviceInMemory(weatherData.StationID, addr, model.DeviceTypeEcowitt) + // 注册设备到数据库 + err = model.RegisterDeviceInDB(weatherData.StationID, addr) + if err != nil { + log.Printf("注册设备失败: %v", err) + } + log.Printf("设备 %s 已注册,IP: %s", weatherData.StationID, addr.String()) + } else { + log.Printf("警告: 收到的数据没有站点ID") + } + + err = model.SaveWeatherData(weatherData, data) + if err != nil { + log.Printf("保存数据到数据库失败: %v", err) + } else { + log.Printf("数据已成功保存到数据库") + } + } +} + +func main() { + setupLogger() + startUDP() // 直接运行UDP服务器,不再使用goroutine +} + +func hexDump(data []byte) string { + var result strings.Builder + for i := 0; i < len(data); i += 16 { + end := i + 16 + if end > len(data) { + end = len(data) + } + chunk := data[i:end] + hexStr := hex.EncodeToString(chunk) + for j := 0; j < len(hexStr); j += 2 { + if j+2 <= len(hexStr) { + result.WriteString(strings.ToUpper(hexStr[j : j+2])) + result.WriteString(" ") + } + } + result.WriteString("\n") + } + return result.String() +} + +func asciiDump(data []byte) string { + var result strings.Builder + for i := 0; i < len(data); i += 64 { + end := i + 64 + if end > len(data) { + end = len(data) + } + chunk := data[i:end] + for _, b := range chunk { + if b >= 32 && b <= 126 { + result.WriteByte(b) + } else { + result.WriteString(".") + } + } + result.WriteString("\n") + } + return result.String() +} diff --git a/server/udp_server.go b/server/udp_server.go new file mode 100644 index 0000000..105f1a9 --- /dev/null +++ b/server/udp_server.go @@ -0,0 +1,61 @@ +package server + +import ( + "log" + "net" + "weatherstation/model" +) + +func StartUDPServer(port string) error { + addr, err := net.ResolveUDPAddr("udp", ":"+port) + if err != nil { + return err + } + + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return err + } + defer conn.Close() + + log.Printf("UDP服务器已启动,监听端口 %s", port) + + buffer := make([]byte, 1024) + for { + n, remoteAddr, err := conn.ReadFromUDP(buffer) + if err != nil { + log.Printf("读取UDP数据失败: %v", err) + continue + } + + // 处理接收到的数据 + go handleUDPData(buffer[:n], remoteAddr) + } +} + +func handleUDPData(data []byte, addr *net.UDPAddr) { + // 尝试解析为WH65LP数据 + weatherData, err := model.ParseUDP10006Data(data) + if err != nil { + log.Printf("解析数据失败: %v", err) + return + } + + // 更新内存中的设备信息 + model.UpdateDeviceInMemory(weatherData.StationID, addr, model.DeviceTypeWH65LP) + + // 注册设备到数据库 + err = model.RegisterDeviceInDB(weatherData.StationID, addr) + if err != nil { + log.Printf("注册设备失败: %v", err) + } + + // 保存数据 + err = model.SaveWH65LPData(weatherData, data) + if err != nil { + log.Printf("保存数据失败: %v", err) + return + } + + log.Printf("成功接收并保存来自 %s 的WH65LP数据:\n%s", addr.String(), weatherData.String()) +} diff --git a/参考页面.html b/参考页面.html new file mode 100644 index 0000000..5ab2f8a --- /dev/null +++ b/参考页面.html @@ -0,0 +1 @@ +Hello \ No newline at end of file